/*
 * Copyright 2019 Spotify AB.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package com.spotify.scio.testing

import com.google.common.util.concurrent.{Futures, ListenableFuture}
import com.spotify.scio._
import com.spotify.scio.transforms.GuavaAsyncLookupDoFn
import com.spotify.scio.io._
import com.spotify.scio.transforms.DoFnWithResource.ResourceType
import com.spotify.scio.transforms.{BaseAsyncLookupDoFn, GuavaAsyncDoFn}
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.Pipeline.PipelineExecutionException
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment
import org.apache.beam.sdk.metrics.DistributionResult
import org.apache.beam.sdk.{io => beam}
import org.scalatest.exceptions.TestFailedException

import scala.io.Source
import org.apache.beam.sdk.metrics.{Counter, Distribution, Gauge}
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider
import org.apache.beam.sdk.transforms.ParDo
import org.apache.beam.sdk.values.KV

import java.io.PrintStream
import scala.collection.mutable.ListBuffer

class MockedPrintStream extends PrintStream("/dev/null") with Serializable {
  val message: ListBuffer[String] = ListBuffer[String]()

  override def write(buf: Array[Byte], off: Int, len: Int): Unit = {
    super.write(buf, off, len)
    message += new String(buf, off, len)
  }
}

object TextFileJob {

  // #JobTestTest_io_pipeline_section
  def pipeline(sc: ScioContext, input: String, output: String): Unit = {
    sc.textFile(input)
      .map(_ + "X")
      .saveAsTextFile(output)
  }
  // #JobTestTest_io_pipeline_section

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    pipeline(sc, args("input"), args("output"))
    sc.run()
    ()
  }
}

object DistCacheJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    val dc =
      sc.distCache(args("distCache"))(f => Source.fromFile(f).getLines().toSeq)
    sc.textFile(args("input"))
      .flatMap(x => dc().map(x + _))
      .saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

object SourceTransformOverrideJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    // #JobTestTest_example_source
    sc.withName("ReadInput")
      .textFile(args("input"))
      // #JobTestTest_example_source
      .saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

object TransformOverrideJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.textFile(args("input"))
      .map(_.toInt)
      .applyTransform(
        "myTransform",
        ParDo.of(
          new GuavaAsyncDoFn[Int, String, Unit]() {
            override def processElement(input: Int): ListenableFuture[String] =
              Futures.immediateFuture(input.toString)
            override def getResourceType: ResourceType = ResourceType.PER_CLASS
            override def createResource(): Unit = ()
          }
        )
      )
      .saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

private[testing] class GuavaLookupDoFn extends GuavaAsyncLookupDoFn[Int, String, Unit]() {
  override def getResourceType: ResourceType = ResourceType.PER_INSTANCE
  override protected def newClient(): Unit = ()
  override def asyncLookup(session: Unit, input: Int): ListenableFuture[String] =
    Futures.immediateFuture(input.toString)
}

object TransformOverrideKVJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    // #JobTestTest_example_kv
    sc.textFile(args("input"))
      .map(_.toInt)
      .applyTransform("myTransform", ParDo.of(new GuavaLookupDoFn))
      .map((i: KV[Int, BaseAsyncLookupDoFn.Try[String]]) => i.getValue.get())
      .saveAsTextFile(args("output"))
    // #JobTestTest_example_kv
    sc.run()
    ()
  }
}

object TransformOverrideIterJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.textFile(args("input"))
      .map(_.toInt)
      // #JobTestTest_example_iter
      .withName("myTransform")
      .transform { c: SCollection[Int] =>
        c.applyTransform(ParDo.of(new GuavaLookupDoFn))
          .flatMap(_.getValue.get())
          .map(_.toString)
      }
      // #JobTestTest_example_iter
      .saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

object TransformOverrideIterKVJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.textFile(args("input"))
      // #JobTestTest_example_mock_iter
      .map(_.toInt)
      .transform("myTransform")(
        _.flatMap(0 until _).applyTransform(ParDo.of(new GuavaLookupDoFn))
      )
      // #JobTestTest_example_mock_iter
      .map(_.getValue.get())
      .saveAsTextFile(args("output"))
    sc.run()
  }
}

object TransformOverrideKVJobFail {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.textFile(args("input"))
      .map(_.toInt)
      .applyTransform(ParDo.of(new GuavaLookupDoFn))
      .applyTransform(
        "myTransform",
        ParDo.of(
          new GuavaAsyncDoFn[KV[Int, BaseAsyncLookupDoFn.Try[String]], String, Unit]() {
            override def processElement(
              input: KV[Int, BaseAsyncLookupDoFn.Try[String]]
            ): ListenableFuture[String] =
              Futures.immediateFuture(input.getValue.get())
            override def getResourceType: ResourceType = ResourceType.PER_CLASS
            override def createResource(): Unit = ()
          }
        )
      )
      .saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

object MaterializeJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    val data = sc.textFile(args("input"))
    data.materialize
    data.saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

object CustomIOJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    val inputTransform = beam.TextIO
      .read()
      .from(args("input"))
    val outputTransform = beam.TextIO
      .write()
      .to(args("output"))
    sc.customInput("TextIn", inputTransform)
      .map(_.toInt)
      .map(_ * 10)
      .map(_.toString)
      .saveAsCustomOutput("TextOut", outputTransform)
    sc.run()
    ()
  }
}

object ReadAllJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    sc.textFile(args("input"))
      .readFiles
      .saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

object ReadAllBytesJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.textFile(args("input"))
      .readFilesAsBytes
      .map(new String(_))
      .saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

object ReadAllWithPathJob {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    val bundleSizeBytes = 64 * 1024 * 1024L
    sc.textFile(args("input"))
      .readFilesWithPath(bundleSizeBytes) { f =>
        new beam.TextSource(
          StaticValueProvider.of(f),
          EmptyMatchTreatment.DISALLOW,
          Array('\n'.toByte),
          0
        )
      }
      .map { case (f, x) => s"$f:$x" }
      .saveAsTextFile(args("output"))
    sc.run()
    ()
  }
}

object JobWithoutRun {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.parallelize(1 to 10)
      .count
      .map(_.toString)
      .saveAsTextFile(args("output"))
    ()
  }
}

object JobWithDuplicateInput {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.textFile(args("input"))
    sc.textFile(args("input"))
    sc.run()
    ()
  }
}

object JobWithDuplicateOutput {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.parallelize(1 to 10)
      .saveAsTextFile(args("output"))

    sc.parallelize(1 to 5)
      .saveAsTextFile(args("output"))

    sc.run()
    ()
  }
}

object MetricsJob {
  val counter: Counter = ScioMetrics.counter("counter")
  val distribution: Distribution = ScioMetrics.distribution("distribution")
  val gauge: Gauge = ScioMetrics.gauge("gauge")

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, _) = ContextAndArgs(cmdlineArgs)
    sc.parallelize(1L to 10L)
      .map { x =>
        counter.inc()
        distribution.update(x)
        gauge.set(x)
        x
      }
    sc.run()
    ()
  }
}

class JobTestTest extends PipelineSpec {
  def testTextFileJob(xs: String*): Unit =
    JobTest[TextFileJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("a", "b", "c"))
      .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(xs))
      .run()

  it should "pass correct TextIO" in {
    testTextFileJob("aX", "bX", "cX")
  }

  it should "fail incorrect TextIO" in {
    an[AssertionError] should be thrownBy { testTextFileJob("aX", "bX") }
    an[AssertionError] should be thrownBy {
      testTextFileJob("aX", "bX", "cX", "dX")
    }
  }

  it should "execute anonymous job" in {
    import TextFileJob.pipeline
    // #JobTestTest_anonymous_job_test
    JobTest(pipeline(_, "in.txt", "out.txt"))
      .input(TextIO("in.txt"), Seq("a", "b", "c"))
      .output(TextIO("out.txt"))(_ should containInAnyOrder(Seq("aX", "bX", "cX")))
      .run()
    // #JobTestTest_anonymous_job_test
  }

  def testDistCacheJob(xs: String*): Unit =
    JobTest[DistCacheJob.type]
      .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
      .input(TextIO("in.txt"), Seq("a", "b"))
      .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
      .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(xs))
      .run()

  it should "pass correct DistCacheIO" in {
    testDistCacheJob("a1", "a2", "b1", "b2")
  }

  it should "fail incorrect DistCacheIO" in {
    an[AssertionError] should be thrownBy { testDistCacheJob("a1", "a2", "b1") }
    an[AssertionError] should be thrownBy {
      testDistCacheJob("a1", "a2", "b1", "b2", "c3", "d4")
    }
  }

  def testCustomIOJob(xs: String*): Unit =
    JobTest[CustomIOJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(CustomIO[String]("TextIn"), Seq(1, 2, 3).map(_.toString))
      .output(CustomIO[String]("TextOut"))(coll => coll should containInAnyOrder(xs))
      .run()

  it should "pass correct CustomIO" in {
    testCustomIOJob("10", "20", "30")
  }

  it should "fail incorrect CustomIO" in {
    an[AssertionError] should be thrownBy { testCustomIOJob("10", "20") }
    an[AssertionError] should be thrownBy {
      testCustomIOJob("10", "20", "30", "40")
    }
  }

  def testReadAllJob(xs: String*): Unit =
    JobTest[ReadAllJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("a", "b"))
      .input(ReadIO("a"), Seq("a1", "a2"))
      .input(ReadIO("b"), Seq("b1", "b2"))
      .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(xs))
      .run()

  it should "pass correct string ReadIO" in {
    testReadAllJob("a1", "a2", "b1", "b2")
  }

  it should "fail correct string ReadIO" in {
    an[AssertionError] should be thrownBy { testReadAllJob("a1", "a2") }
    an[AssertionError] should be thrownBy {
      testReadAllJob("a1", "a2", "b1", "b2", "c1")
    }
  }

  it should "fail string ReadIO used with TestStream input" in {
    val testStream = testStreamOf[String]
      .addElements("a1", "a2")
      .advanceWatermarkToInfinity()

    the[PipelineExecutionException] thrownBy {
      JobTest[ReadAllJob.type]
        .args("--input=in.txt", "--output=out.txt")
        .input(TextIO("in.txt"), Seq("a"))
        .inputStream(ReadIO("a"), testStream)
        .output(TextIO("out.txt")) { _ => }
        .run()
    } should have message
      s"java.lang.UnsupportedOperationException: Test input TestStream(${testStream.getEvents}) " +
      s"can't be converted to Iterable[T] to test this ScioIO type"
  }

  def testReadAllBytesJob(xs: String*): Unit =
    JobTest[ReadAllBytesJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("a", "b"))
      .input(ReadIO("a"), Seq("a1", "a2").map(_.getBytes))
      .input(ReadIO("b"), Seq("b1", "b2").map(_.getBytes))
      .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(xs))
      .run()

  it should "pass correct bytes ReadIO" in {
    testReadAllBytesJob("a1", "a2", "b1", "b2")
  }

  it should "fail correct bytes ReadIO" in {
    an[AssertionError] should be thrownBy { testReadAllBytesJob("a1", "a2") }
    an[AssertionError] should be thrownBy {
      testReadAllBytesJob("a1", "a2", "b1", "b2", "c1")
    }
  }

  it should "fail bytes ReadIO used with TestStream input" in {
    val testStream = testStreamOf[Array[Byte]]
      .addElements("a1".getBytes, "a2".getBytes)
      .advanceWatermarkToInfinity()

    the[PipelineExecutionException] thrownBy {
      JobTest[ReadAllBytesJob.type]
        .args("--input=in.txt", "--output=out.txt")
        .input(TextIO("in.txt"), Seq("a"))
        .inputStream(ReadIO("a"), testStream)
        .output(TextIO("out.txt")) { _ => }
        .run()
    } should have message
      s"java.lang.UnsupportedOperationException: Test input TestStream(${testStream.getEvents}) " +
      s"can't be converted to Iterable[T] to test this ScioIO type"
  }

  def testReadAllWithPathJob(xs: String*): Unit =
    JobTest[ReadAllWithPathJob.type]
      .args("--input=in1.txt", "--output=out.txt")
      .input(TextIO("in1.txt"), Seq("a", "b"))
      .input(ReadIO("a"), Seq("a1", "a2"))
      .input(ReadIO("b"), Seq("b1", "b2"))
      .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(xs))
      .run()

  it should "pass correct path ReadIO" in {
    testReadAllWithPathJob("a:a1", "a:a2", "b:b1", "b:b2")
  }

  it should "fail correct path ReadIO" in {
    an[AssertionError] should be thrownBy { testReadAllWithPathJob("a:a1", "a:a2") }
    an[AssertionError] should be thrownBy {
      testReadAllWithPathJob("a:a1", "a:a2", "b:b1", "b:b2", "c:c1")
    }
  }

  it should "fail path ReadIO used with TestStream input" in {
    val testStream = testStreamOf[Array[Byte]]
      .addElements("a1".getBytes, "a2".getBytes)
      .advanceWatermarkToInfinity()

    the[PipelineExecutionException] thrownBy {
      JobTest[ReadAllWithPathJob.type]
        .args("--input=in.txt", "--output=out.txt")
        .input(TextIO("in.txt"), Seq("a"))
        .inputStream(ReadIO("a"), testStream)
        .output(TextIO("out.txt")) { _ => }
        .run()
    } should have message
      s"java.lang.UnsupportedOperationException: Test input TestStream(${testStream.getEvents}) " +
      s"can't be converted to Iterable[T] to test this ScioIO type"
  }

  // =======================================================================
  // Handling incorrect test wiring
  // =======================================================================

  it should "fail missing test input" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .run()
    } should have message "requirement failed: Missing test input: TextIO(in.txt), available: []"
  }

  it should "fail misspelled test input" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("bad-in.txt"), Seq("a", "b"))
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .run()
    } should have message
      "requirement failed: Missing test input: TextIO(in.txt), available: [TextIO(bad-in.txt)]"
  }

  it should "fail unmatched test input" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .input(TextIO("unmatched.txt"), Seq("X", "Y"))
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .run()
    } should have message "requirement failed: Unmatched test input: TextIO(unmatched.txt)"
  }

  it should "fail duplicate test input" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type](enforceRun = false)
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .input(TextIO("in.txt"), Seq("X", "Y"))
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .run()
    } should have message "requirement failed: Duplicate test input: TextIO(in.txt)"
  }

  it should "fail missing test output" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .run()
    } should have message "requirement failed: Missing test output: TextIO(out.txt), available: []"
  }

  it should "fail misspelled test output" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .output(TextIO("bad-out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .run()
    } should have message
      "requirement failed: Missing test output: TextIO(out.txt), available: [TextIO(bad-out.txt)]"
  }

  it should "fail unmatched test output" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .output(TextIO("unmatched.txt"))(coll => coll should containInAnyOrder(Seq("X", "Y")))
        .run()
    } should have message "requirement failed: Unmatched test output: TextIO(unmatched.txt)"
  }

  it should "fail duplicate test output" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type](enforceRun = false)
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(Seq("X", "Y")))
        .run()
    } should have message "requirement failed: Duplicate test output: TextIO(out.txt)"
  }

  it should "fail missing test dist cache" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .run()
    } should have message
      "requirement failed: Missing test dist cache: DistCacheIO(dc.txt), available: []"
  }

  it should "fail misspelled test dist cache" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .distCache(DistCacheIO("bad-dc.txt"), Seq("1", "2"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .run()
    } should have message
      "requirement failed: Missing test dist cache: DistCacheIO(dc.txt), available: " +
      "[DistCacheIO(bad-dc.txt)]"
  }

  it should "fail unmatched test dist cache" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type]
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .distCache(DistCacheIO("unmatched.txt"), Seq("X", "Y"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .run()
    } should have message
      "requirement failed: Unmatched test dist cache: DistCacheIO(unmatched.txt)"
  }

  it should "fail duplicate test dist cache" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[DistCacheJob.type](enforceRun = false)
        .args("--input=in.txt", "--output=out.txt", "--distCache=dc.txt")
        .input(TextIO("in.txt"), Seq("a", "b"))
        .distCache(DistCacheIO("dc.txt"), Seq("1", "2"))
        .distCache(DistCacheIO("dc.txt"), Seq("X", "Y"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("a1", "a2", "b1", "b2"))
        }
        .run()
    } should have message
      "requirement failed: Duplicate test dist cache: DistCacheIO(dc.txt)"
  }

  it should "ignore materialize" in {
    JobTest[MaterializeJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("a", "b"))
      .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(Seq("a", "b")))
      .run()
  }

  it should "fail job without run" in {
    the[IllegalArgumentException] thrownBy {
      JobTest[JobWithoutRun.type]
        .args("--output=out.txt")
        .output(TextIO("out.txt"))(coll => coll should containInAnyOrder(Seq("10")))
        .run()
    } should have message
      "requirement failed: ScioContext was not executed. Did you forget .run()?"
  }

  // =======================================================================
  // Tests of JobTest testing wiring
  // =======================================================================

  class JobTestFromType extends PipelineSpec {
    "JobTestFromType" should "work" in {
      JobTest[TextFileJob.type]
        .args("--input=in.txt", "--output=out.txt")
        .input(TextIO("in.txt"), Seq("a", "b", "c"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("aX", "bX", "cX"))
        }
    }
  }

  class JobTestFromString extends PipelineSpec {
    "JobTestFromString" should "work" in {
      JobTest("com.spotify.scio.testing.TextFileJob")
        .args("--input=in.txt", "--output=out.txt")
        .input(TextIO("in.txt"), Seq("a", "b", "c"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("aX", "bX", "cX"))
        }
    }
  }

  class MultiJobTest extends PipelineSpec {
    "MultiJobTest" should "work" in {
      JobTest[TextFileJob.type]
        .args("--input=in.txt", "--output=out.txt")
        .input(TextIO("in.txt"), Seq("a", "b", "c"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("aX", "bX", "cX"))
        }

      JobTest[TextFileJob.type]
        .args("--input=in2.txt", "--output=out2.txt")
        .input(TextIO("in2.txt"), Seq("d", "e", "f"))
        .output(TextIO("out2.txt")) { coll =>
          coll should containInAnyOrder(Seq("dX", "eX", "fX"))
        }
    }
  }

  class OriginalJobTest extends PipelineSpec {
    import com.spotify.scio.testing.{JobTest => InternalJobTest}
    "OriginalJobTest" should "work" in {
      InternalJobTest[TextFileJob.type]
        .args("--input=in.txt", "--output=out.txt")
        .input(TextIO("in.txt"), Seq("a", "b", "c"))
        .output(TextIO("out.txt")) { coll =>
          coll should containInAnyOrder(Seq("aX", "bX", "cX"))
        }
    }
  }

  class JobTestWithNonUnitAssertion extends PipelineSpec {
    import com.spotify.scio.testing.{JobTest => InternalJobTest}
    "JobTestWithNonUnitAssertion" should "work" in {
      InternalJobTest[TextFileJob.type]
        .args("--input=in.txt", "--output=out.txt")
        .input(TextIO("in.txt"), Seq("a", "b", "c"))
        // warns given flag `-Ywarn-value-discard`
        // if `output` only accepts `SCollection[_] => Unit`
        // instead of `SCollection[_] => Assertion`
        .output(TextIO("out.txt"))(_ => ())
    }
  }

  private val runMissedMessage =
    """|- should work \*\*\* FAILED \*\*\*
       |  Did you forget run\(\)\?
       |  Missing run\(\): JobTest\[com.spotify.scio.testing.TextFileJob\]\(
       |  \targs: --input=in.txt --output=out.txt
       |  \tinputs: TextIO\(in.txt\)
       |  \toutputs: TextIO\(out.txt\)
       |  \) \(JobTestTest.scala:.*\)""".stripMargin

  it should "enforce run() on JobTest from class type" in {
    val stdOutMock = new MockedPrintStream
    Console.withOut(stdOutMock) {
      new JobTestFromType()
        .execute("JobTestFromType should work", color = false)
    }
    stdOutMock.message.mkString("") should include regex runMissedMessage
  }

  it should "enforce run() on multi JobTest" in {
    val stdOutMock = new MockedPrintStream
    Console.withOut(stdOutMock) {
      new MultiJobTest().execute("MultiJobTest should work", color = false)
    }

    val msg =
      """|- should work \*\*\* FAILED \*\*\*
         |  Did you forget run\(\)\?
         |  Missing run\(\): JobTest\[com.spotify.scio.testing.TextFileJob\]\(
         |  \targs: --input=in.txt --output=out.txt
         |  \tinputs: TextIO\(in.txt\)
         |  \toutputs: TextIO\(out.txt\)
         |  \)""".stripMargin

    stdOutMock.message.mkString("") should include regex msg
  }

  it should "enforce run() on JobTest from string class" in {
    val stdOutMock = new MockedPrintStream
    Console.withOut(stdOutMock) {
      new JobTestFromString()
        .execute("JobTestFromString should work", color = false)
    }
    stdOutMock.message.mkString("") should include regex runMissedMessage
  }

  it should "not enforce run() on internal JobTest" in {
    val stdOutMock = new MockedPrintStream
    Console.withOut(stdOutMock) {
      new OriginalJobTest()
        .execute("OriginalJobTest should work", color = false)
    }
    stdOutMock.message.mkString("") shouldNot include regex runMissedMessage
  }

  // =======================================================================
  // Test invalid ScioIO
  // =======================================================================

  it should "fail on duplicate usages of inputs in the job itself" in {
    val msg = "requirement failed: Test input TextIO(input) has already been read from once."
    the[IllegalArgumentException] thrownBy {
      JobTest[JobWithDuplicateInput.type]
        .args("--input=input")
        .input(TextIO("input"), Seq("does", "not", "matter"))
        .run()
    } should have message msg
  }

  it should "fail on duplicate outputs in the job itself" in {
    val msg = "requirement failed: Test output TextIO(output) has already been written to once."
    the[IllegalArgumentException] thrownBy {
      JobTest[JobWithDuplicateOutput.type]
        .args("--output=output")
        .output(TextIO("output"))(coll => coll should containSingleValue("does not matter"))
        .run()
    } should have message msg
  }

  // =======================================================================
  // Test metrics
  // =======================================================================

  it should "pass correct metrics test" in {
    JobTest[MetricsJob.type]
      .counter(MetricsJob.counter)(x => x shouldBe 10)
      .counters(_ should contain(MetricsJob.counter.getName -> 10))
      .distribution(MetricsJob.distribution) { d =>
        d.getCount shouldBe 10
        d.getMin shouldBe 1
        d.getMax shouldBe 10
        d.getSum shouldBe 55
        d.getMean shouldBe 5.5
      }
      .distributions(
        _ should contain(
          MetricsJob.distribution.getName ->
            DistributionResult.create(55, 10, 1, 10)
        )
      )
      .gauge(MetricsJob.gauge) { g =>
        g.getValue should be >= 1L
        g.getValue should be <= 10L
      }
      .gauges(_.map { case (_, result) =>
        result.getValue should be >= 1L
        result.getValue should be <= 10L
      })
      .run()
  }

  it should "fail incorrect counter test" in {
    the[TestFailedException] thrownBy {
      JobTest[MetricsJob.type]
        .counter(MetricsJob.counter)(x => x shouldBe 100)
        .run()
    } should have message "10 was not equal to 100"
  }

  it should "fail incorrect distribution test" in {
    the[TestFailedException] thrownBy {
      JobTest[MetricsJob.type]
        .distribution(MetricsJob.distribution)(x => x.getMax shouldBe 100)
        .run()
    } should have message "10 was not equal to 100"
  }

  it should "fail incorrect gauge test" in {
    val e = the[TestFailedException] thrownBy {
      JobTest[MetricsJob.type]
        .gauge(MetricsJob.gauge)(x => x.getValue should be >= 100L)
        .run()
    }
    e.getMessage should endWith(" was not greater than or equal to 100")
  }

  "transformOverride.ofSource" should "support non-empty input" in {
    JobTest[SourceTransformOverrideJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq.empty[String]) // still required for pipeline construction
      .transformOverride(
        // #JobTestTest_example_source_mock
        TransformOverride.ofSource[String]("ReadInput", List("10", "11", "12"))
        // #JobTestTest_example_source_mock
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "11", "12")))
      .run()
  }

  it should "support empty input" in {
    JobTest[SourceTransformOverrideJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq.empty[String])
      .transformOverride(
        TransformOverride.ofSource[String]("ReadInput", List())
      )
      .output(TextIO("out.txt"))(_ should beEmpty)
      .run()
  }

  "TransformOverride.ofIter" should "support 1-to-n inputs" in {
    JobTest[TransformOverrideIterJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2"))
      .transformOverride(
        // #JobTestTest_example_mock_iter_map
        TransformOverride.ofIter[Int, String](
          "myTransform",
          Map(1 -> Seq("10"), 2 -> Seq("20", "21"), 3 -> Seq())
        )
        // #JobTestTest_example_mock_iter_map
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20", "21")))
      .run()
  }

  it should "support 1-to-n function inputs" in {
    JobTest[TransformOverrideIterJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2", "3"))
      .transformOverride(
        // #JobTestTest_example_mock_iter_fun
        TransformOverride.ofIter[Int, String](
          "myTransform",
          // map fn equal to: Map(1 -> Seq(), 2 -> Seq("1"), 3 -> Seq("1", "2")}
          (i: Int) => { (1 until i).map(String.valueOf(_)) }
        )
        // #JobTestTest_example_mock_iter_fun
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("1", "1", "2")))
      .run()
  }

  "TransformOverride.of" should "support non-empty input" in {
    JobTest[TransformOverrideJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2"))
      .transformOverride(
        TransformOverride.of[Int, String](
          "myTransform",
          Map(1 -> "10", 2 -> "20", 3 -> "30")
        )
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
      .run()
  }

  it should "support function input" in {
    JobTest[TransformOverrideJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2"))
      .transformOverride(
        TransformOverride.of[Int, String](
          "myTransform",
          (i: Int) => s"${i * 10}"
        )
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
      .run()
  }

  it should "fail with an incorrect override input type" in {
    an[IllegalArgumentException] should be thrownBy {
      try {
        JobTest[TransformOverrideJob.type]
          .args("--input=in.txt", "--output=out.txt")
          .input(TextIO("in.txt"), Seq("1", "2"))
          .transformOverride(
            TransformOverride.of[String, String](
              "myTransform",
              Map("10" -> "10")
            )
          )
          .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
          .run()
      } catch {
        case e: PipelineExecutionException => throw Option(e.getCause).getOrElse(e)
      }
    }
  }

  it should "fail with an incorrect function override input type" in {
    an[IllegalArgumentException] should be thrownBy {
      try {
        JobTest[TransformOverrideJob.type]
          .args("--input=in.txt", "--output=out.txt")
          .input(TextIO("in.txt"), Seq("1", "2"))
          .transformOverride(
            TransformOverride.of[String, String](
              "myTransform",
              (i: String) => s"${i.toInt * 10}"
            )
          )
          .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
          .run()
      } catch {
        case e: PipelineExecutionException => throw Option(e.getCause).getOrElse(e)
      }
    }
  }

  it should "fail differently with an incorrect override generic input type" in {
    // this test describes rather than prescribes current behavior, ideally we would throw IllegalArgumentException
    an[ClassCastException] should be thrownBy {
      try {
        JobTest[TransformOverrideKVJobFail.type]
          .args("--input=in.txt", "--output=out.txt")
          .input(TextIO("in.txt"), Seq("1", "2"))
          .transformOverride(
            TransformOverride.of[KV[String, BaseAsyncLookupDoFn.Try[Int]], String](
              "myTransform",
              (i: KV[String, BaseAsyncLookupDoFn.Try[Int]]) => s"${i.getValue.get() * 10}"
            )
          )
          .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
          .run()
      } catch {
        case e: PipelineExecutionException => throw Option(e.getCause).getOrElse(e)
      }
    }
  }

  it should "fail with an incorrect override output type" in {
    // this test describes rather than prescribes current behavior, ideally we would throw IllegalArgumentException
    an[ClassCastException] should be thrownBy {
      try {
        JobTest[TransformOverrideJob.type]
          .args("--input=in.txt", "--output=out.txt")
          .input(TextIO("in.txt"), Seq("1", "2"))
          .transformOverride(
            TransformOverride.of[Int, Int](
              "myTransform",
              Map(1 -> 10, 2 -> 20, 3 -> 30)
            )
          )
          .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
          .run()
      } catch {
        case e: PipelineExecutionException => throw Option(e.getCause).getOrElse(e)
      }
    }
  }

  "TransformOverride.ofKV" should "support non-empty input" in {
    JobTest[TransformOverrideKVJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2"))
      .transformOverride(
        TransformOverride.ofKV[Int, BaseAsyncLookupDoFn.Try[String]](
          "myTransform",
          Map(1 -> "10", 2 -> "20", 3 -> "30")
            .map { case (k, v) => k -> new BaseAsyncLookupDoFn.Try(v) }
        )
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
      .run()
  }

  it should "suppport a KV function override" in {
    JobTest[TransformOverrideKVJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2"))
      .transformOverride(
        TransformOverride.ofKV[Int, BaseAsyncLookupDoFn.Try[String]](
          "myTransform",
          (i: Int) => new BaseAsyncLookupDoFn.Try(s"${i * 10}")
        )
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
      .run()
  }

  "TransformOverride.ofIterKV" should "support non-empty input" in {
    JobTest[TransformOverrideIterKVJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2", "3"))
      .transformOverride(
        TransformOverride.ofIterKV[Int, BaseAsyncLookupDoFn.Try[String]](
          "myTransform",
          Map(1 -> Seq(), 2 -> Seq("20", "21"), 3 -> Seq("30"), 4 -> Seq("40"))
            .map { case (k, v) => k -> v.map(new BaseAsyncLookupDoFn.Try(_)) }
        )
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("20", "21", "30")))
      .run()
  }

  it should "support a 1-to-n KV function input" in {
    JobTest[TransformOverrideIterKVJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2", "3"))
      .transformOverride(
        TransformOverride.ofIterKV[Int, BaseAsyncLookupDoFn.Try[String]](
          "myTransform",
          // map fn equal to: Map(1 -> Seq(), 2 -> Seq(Try("1")), 3 -> Seq(Try("1"), Try("2"))}
          (i: Int) => (1 until i).map(String.valueOf).map(new BaseAsyncLookupDoFn.Try(_))
        )
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("1", "1", "2")))
      .run()
  }

  "TransformOverride.ofAsyncLookup" should "support non-empty input" in {
    JobTest[TransformOverrideKVJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2"))
      // #JobTestTest_example_mock_kv_map
      .transformOverride(
        TransformOverride.ofAsyncLookup[Int, String](
          "myTransform",
          Map(1 -> "10", 2 -> "20", 3 -> "30")
        )
      )
      // #JobTestTest_example_mock_kv_map
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
      .run()
  }

  it should "pass with an AsyncLookup function override" in {
    JobTest[TransformOverrideKVJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2"))
      .transformOverride(
        // #JobTestTest_example_mock_kv_fun
        TransformOverride.ofAsyncLookup[Int, String](
          "myTransform",
          (i: Int) => s"${i * 10}"
        )
        // #JobTestTest_example_mock_kv_fun
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
      .run()
  }

  "TransformOverride.ofIterAsyncLookup" should "support a 1-to-n AsyncLookup function override" in {
    JobTest[TransformOverrideIterKVJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2", "3"))
      .transformOverride(
        TransformOverride.ofIterAsyncLookup[Int, String](
          "myTransform",
          // map fn equal to: Map(1 -> Seq(), 2 -> Seq("1"), 3 -> Seq("1", "2")}
          (i: Int) => { (1 until i).map(String.valueOf(_)) }
        )
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("1", "1", "2")))
      .run()
  }

  it should "support a 1-to-n AsyncLookup override" in {
    JobTest[TransformOverrideIterKVJob.type]
      .args("--input=in.txt", "--output=out.txt")
      .input(TextIO("in.txt"), Seq("1", "2"))
      .transformOverride(
        TransformOverride.ofIterAsyncLookup[Int, String](
          "myTransform",
          Map(1 -> Seq(), 2 -> Seq("10", "20"), 3 -> Seq("30"))
        )
      )
      .output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
      .run()
  }
}
